package cn.stylefeng.roses.kernel.sync.modular.cc;

import cn.stylefeng.roses.kernel.sync.core.util.CustomSpringContextHolder;
import cn.stylefeng.roses.kernel.sync.modular.ew.base.AbstractEntryWrapper;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.Assert;

import java.util.List;

/**
 * canal客户端基类
 *
 * @author jianghang 2013-4-15 下午04:17:12
 * @version 1.0.4
 */
public class AbstractCanalClient {

    protected final static Logger logger = LoggerFactory.getLogger(AbstractCanalClient.class);
    protected volatile boolean running = false;
    protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
    protected Thread thread = null;
    protected CanalConnector connector;
    protected String destination;

    public AbstractCanalClient(String destination) {
        this(destination, null);
    }

    public AbstractCanalClient(String destination, CanalConnector connector) {
        this.destination = destination;
        this.connector = connector;
    }

    public void setConnector(CanalConnector connector) {
        this.connector = connector;
    }

    /**
     * 启动canal客户端
     *
     * @author fengshuonan
     * @Date 2019/1/16 7:06 PM
     */
    protected void start() {
        Assert.notNull(connector, "connector is null");
        thread = new Thread(() -> process());

        thread.setUncaughtExceptionHandler(handler);
        running = true;
        thread.start();
    }

    /**
     * 停止canal客户端
     *
     * @author fengshuonan
     * @Date 2019/1/16 7:07 PM
     */
    protected void stop() {
        if (!running) {
            return;
        }
        running = false;
        if (thread != null) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                // ignore
            }
        }

        MDC.remove("destination");
    }

    /**
     * 连接/订阅/消费 canal服务端
     *
     * @author fengshuonan
     * @Date 2019/1/16 7:07 PM
     */
    protected void process() {
        int batchSize = 5 * 1024;
        while (running) {
            try {
                MDC.put("destination", destination);
                connector.connect();
                connector.subscribe();
                while (running) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        // try {
                        // Thread.sleep(1000);
                        // } catch (InterruptedException e) {
                        // }
                    } else {
                        processEntry(message.getEntries());
                    }

                    connector.ack(batchId); // 提交确认
                }
            } catch (Exception e) {
                logger.error("processEntrys error!", e);
            } finally {
                connector.disconnect();
                MDC.remove("destination");
            }
        }
    }

    /**
     * 处理拉取到canal服务端的变化信息
     *
     * @author fengshuonan
     * @Date 2019/1/16 6:57 PM
     */
    protected void processEntry(List<Entry> entrys) {

        List<AbstractEntryWrapper> entryProcessors = CustomSpringContextHolder.getBeanOfType(AbstractEntryWrapper.class);

        if (entryProcessors == null || entryProcessors.size() == 0) {
            return;
        }

        for (AbstractEntryWrapper entryProcessor : entryProcessors) {
            entryProcessor.processEntrys(entrys);
        }

    }
}